package importer

import (
	"context"
	"dash/collectors"
	"encoding/json"
	"github.com/Shopify/sarama"
	"github.com/go-kit/kit/log"
)

type TopicHandler func(ctx context.Context, msg *sarama.ConsumerMessage) error
func NewHeartbeatTopicHandler(_ log.Logger) TopicHandler {
	return func(ctx context.Context, msg *sarama.ConsumerMessage) error {
		return nil
	}
}

func NewMessageTopicHandler(_ log.Logger, repository Repository) TopicHandler {
	return func(ctx context.Context, msg *sarama.ConsumerMessage) error {
		var report collectors.MessageReport
		if err := json.Unmarshal(msg.Value, &report); err != nil {
			return err
		}
		return repository.SaveMessage(ctx, report)
	}
}

func NewCollectorJoinChatroomTopicHandler(_ log.Logger, repository Repository) TopicHandler {
	return func(ctx context.Context, msg *sarama.ConsumerMessage) error {
		var report collectors.CollectorJoinChatroomReport
		if err := json.Unmarshal(msg.Value, &report); err != nil {
			return err
		}
		return repository.SaveCollectorJoinChatroom(ctx, report)
	}
}

func NewCollectorLeaveChatroomTopicHandler(_ log.Logger, repository Repository) TopicHandler {
	return func(ctx context.Context, msg *sarama.ConsumerMessage) error {
		var report collectors.CollectorLeaveChatroomReport
		if err := json.Unmarshal(msg.Value, &report); err != nil {
			return err
		}
		return repository.SaveCollectorLeaveChatroom(ctx, report)
	}
}

func NewMemberJoinChatroomTopicHandler(_ log.Logger, repository Repository) TopicHandler {
	return func(ctx context.Context, msg *sarama.ConsumerMessage) error {
		var report collectors.MemberJoinChatroomReport
		if err := json.Unmarshal(msg.Value, &report); err != nil {
			return err
		}
		return repository.SaveMemberJoinChatroom(ctx, report)
	}
}

func NewMemberLeaveChatroomTopicHandler(_ log.Logger, repository Repository) TopicHandler {
	return func(ctx context.Context, msg *sarama.ConsumerMessage) error {
		var report collectors.MemberLeaveChatroomReport
		if err := json.Unmarshal(msg.Value, &report); err != nil {
			return err
		}
		return repository.SaveMemberLeaveChatroom(ctx, report)
	}
}

func NewChatroomMemberTopicHandler(_ log.Logger, repository Repository) TopicHandler {
	return func(ctx context.Context, msg *sarama.ConsumerMessage) error {
		var report collectors.ChatroomMemberReport
		if err := json.Unmarshal(msg.Value, &report); err != nil {
			return err
		}
		return repository.SaveChatroomMember(ctx, report)
	}
}
